/*-------------------------------------------------------------------------
 *
 * generic_xlog.c
 *     Implementation of generic xlog records.
 *
 *
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * src/backend/access/transam/generic_xlog.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/bufmask.h"
#include "access/generic_xlog.h"
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "utils/memutils.h"

/*-------------------------------------------------------------------------
 * Internally, a delta between pages consists of a set of fragments.  Each
 * fragment represents changes made in a given region of a page.  A fragment
 * is made up as follows:
 *
 * - offset of page region (OffsetNumber)
 * - length of page region (OffsetNumber)
 * - data - the data to place into the region ('length' number of bytes)
 *
 * Unchanged regions of a page are not represented in its delta.  As a result,
 * a delta can be more compact than the full page image.  But having an
 * unchanged region between two fragments that is smaller than the fragment
 * header (offset+length) does not pay off in terms of the overall size of
 * the delta.  For this reason, we merge adjacent fragments if the unchanged
 * region between them is <= MATCH_THRESHOLD bytes.
 *
 * We do not bother to merge fragments across the "lower" and "upper" parts
 * of a page; it's very seldom the case that pd_lower and pd_upper are within
 * MATCH_THRESHOLD bytes of each other, and handling that infrequent case
 * would complicate and slow down the delta-computation code unduly.
 * Therefore, the worst-case delta size includes two fragment headers plus
 * a full page's worth of data.
 *-------------------------------------------------------------------------
 */
#define FRAGMENT_HEADER_SIZE    (2 * sizeof(OffsetNumber))
#define MATCH_THRESHOLD            FRAGMENT_HEADER_SIZE
#define MAX_DELTA_SIZE            (BLCKSZ + 2 * FRAGMENT_HEADER_SIZE)

/* Struct of generic xlog data for single page */
typedef struct
{
    Buffer        buffer;            /* registered buffer */
    int            flags;            /* flags for this buffer */
    int            deltaLen;        /* space consumed in delta field */
    char       *image;            /* copy of page image for modification, do not
                                 * do it in-place to have aligned memory chunk */
    char        delta[MAX_DELTA_SIZE];    /* delta between page images */
} PageData;

/* State of generic xlog record construction */
struct GenericXLogState
{
    /*
     * page's images. Should be first in this struct to have MAXALIGN'ed
     * images addresses, because some code working with pages directly aligns
     * addresses, not offsets from beginning of page
     */
    char        images[MAX_GENERIC_XLOG_PAGES * BLCKSZ];
    PageData    pages[MAX_GENERIC_XLOG_PAGES];
    bool        isLogged;
};

static void writeFragment(PageData *pageData, OffsetNumber offset,
              OffsetNumber len, const char *data);
static void computeRegionDelta(PageData *pageData,
                   const char *curpage, const char *targetpage,
                   int targetStart, int targetEnd,
                   int validStart, int validEnd);
static void computeDelta(PageData *pageData, Page curpage, Page targetpage);
static void applyPageRedo(Page page, const char *delta, Size deltaSize);


/*
 * Write next fragment into pageData's delta.
 *
 * The fragment has the given offset and length, and data points to the
 * actual data (of length length).
 */
static void
writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
              const char *data)
{
    char       *ptr = pageData->delta + pageData->deltaLen;

    /* Verify we have enough space */
    Assert(pageData->deltaLen + sizeof(offset) +
           sizeof(length) + length <= sizeof(pageData->delta));

    /* Write fragment data */
    memcpy(ptr, &offset, sizeof(offset));
    ptr += sizeof(offset);
    memcpy(ptr, &length, sizeof(length));
    ptr += sizeof(length);
    memcpy(ptr, data, length);
    ptr += length;

    pageData->deltaLen = ptr - pageData->delta;
}

/*
 * Compute the XLOG fragments needed to transform a region of curpage into the
 * corresponding region of targetpage, and append them to pageData's delta
 * field.  The region to transform runs from targetStart to targetEnd-1.
 * Bytes in curpage outside the range validStart to validEnd-1 should be
 * considered invalid, and always overwritten with target data.
 *
 * This function is a hot spot, so it's worth being as tense as possible
 * about the data-matching loops.
 */
static void
computeRegionDelta(PageData *pageData,
                   const char *curpage, const char *targetpage,
                   int targetStart, int targetEnd,
                   int validStart, int validEnd)
{// #lizard forgives
    int            i,
                loopEnd,
                fragmentBegin = -1,
                fragmentEnd = -1;

    /* Deal with any invalid start region by including it in first fragment */
    if (validStart > targetStart)
    {
        fragmentBegin = targetStart;
        targetStart = validStart;
    }

    /* We'll deal with any invalid end region after the main loop */
    loopEnd = Min(targetEnd, validEnd);

    /* Examine all the potentially matchable bytes */
    i = targetStart;
    while (i < loopEnd)
    {
        if (curpage[i] != targetpage[i])
        {
            /* On unmatched byte, start new fragment if not already in one */
            if (fragmentBegin < 0)
                fragmentBegin = i;
            /* Mark unmatched-data endpoint as uncertain */
            fragmentEnd = -1;
            /* Extend the fragment as far as possible in a tight loop */
            i++;
            while (i < loopEnd && curpage[i] != targetpage[i])
                i++;
            if (i >= loopEnd)
                break;
        }

        /* Found a matched byte, so remember end of unmatched fragment */
        fragmentEnd = i;

        /*
         * Extend the match as far as possible in a tight loop.  (On typical
         * workloads, this inner loop is the bulk of this function's runtime.)
         */
        i++;
        while (i < loopEnd && curpage[i] == targetpage[i])
            i++;

        /*
         * There are several possible cases at this point:
         *
         * 1. We have no unwritten fragment (fragmentBegin < 0).  There's
         * nothing to write; and it doesn't matter what fragmentEnd is.
         *
         * 2. We found more than MATCH_THRESHOLD consecutive matching bytes.
         * Dump out the unwritten fragment, stopping at fragmentEnd.
         *
         * 3. The match extends to loopEnd.  We'll do nothing here, exit the
         * loop, and then dump the unwritten fragment, after merging it with
         * the invalid end region if any.  If we don't so merge, fragmentEnd
         * establishes how much the final writeFragment call needs to write.
         *
         * 4. We found an unmatched byte before loopEnd.  The loop will repeat
         * and will enter the unmatched-byte stanza above.  So in this case
         * also, it doesn't matter what fragmentEnd is.  The matched bytes
         * will get merged into the continuing unmatched fragment.
         *
         * Only in case 3 do we reach the bottom of the loop with a meaningful
         * fragmentEnd value, which is why it's OK that we unconditionally
         * assign "fragmentEnd = i" above.
         */
        if (fragmentBegin >= 0 && i - fragmentEnd > MATCH_THRESHOLD)
        {
            writeFragment(pageData, fragmentBegin,
                          fragmentEnd - fragmentBegin,
                          targetpage + fragmentBegin);
            fragmentBegin = -1;
            fragmentEnd = -1;    /* not really necessary */
        }
    }

    /* Deal with any invalid end region by including it in final fragment */
    if (loopEnd < targetEnd)
    {
        if (fragmentBegin < 0)
            fragmentBegin = loopEnd;
        fragmentEnd = targetEnd;
    }

    /* Write final fragment if any */
    if (fragmentBegin >= 0)
    {
        if (fragmentEnd < 0)
            fragmentEnd = targetEnd;
        writeFragment(pageData, fragmentBegin,
                      fragmentEnd - fragmentBegin,
                      targetpage + fragmentBegin);
    }
}

/*
 * Compute the XLOG delta record needed to transform curpage into targetpage,
 * and store it in pageData's delta field.
 */
static void
computeDelta(PageData *pageData, Page curpage, Page targetpage)
{
    int            targetLower = ((PageHeader) targetpage)->pd_lower,
                targetUpper = ((PageHeader) targetpage)->pd_upper,
                curLower = ((PageHeader) curpage)->pd_lower,
                curUpper = ((PageHeader) curpage)->pd_upper;

    pageData->deltaLen = 0;

    /* Compute delta records for lower part of page ... */
    computeRegionDelta(pageData, curpage, targetpage,
                       0, targetLower,
                       0, curLower);
    /* ... and for upper part, ignoring what's between */
    computeRegionDelta(pageData, curpage, targetpage,
                       targetUpper, BLCKSZ,
                       curUpper, BLCKSZ);

    /*
     * If xlog debug is enabled, then check produced delta.  Result of delta
     * application to curpage should be equivalent to targetpage.
     */
#ifdef WAL_DEBUG
    if (XLOG_DEBUG)
    {
        char        tmp[BLCKSZ];

        memcpy(tmp, curpage, BLCKSZ);
        applyPageRedo(tmp, pageData->delta, pageData->deltaLen);
        if (memcmp(tmp, targetpage, targetLower) != 0 ||
            memcmp(tmp + targetUpper, targetpage + targetUpper,
                   BLCKSZ - targetUpper) != 0)
            elog(ERROR, "result of generic xlog apply does not match");
    }
#endif
}

/*
 * Start new generic xlog record for modifications to specified relation.
 */
GenericXLogState *
GenericXLogStart(Relation relation)
{
    GenericXLogState *state;
    int            i;

    state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
    state->isLogged = RelationNeedsWAL(relation);

    for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
    {
        state->pages[i].image = state->images + BLCKSZ * i;
        state->pages[i].buffer = InvalidBuffer;
    }

    return state;
}

/*
 * Register new buffer for generic xlog record.
 *
 * Returns pointer to the page's image in the GenericXLogState, which
 * is what the caller should modify.
 *
 * If the buffer is already registered, just return its existing entry.
 * (It's not very clear what to do with the flags in such a case, but
 * for now we stay with the original flags.)
 */
Page
GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags)
{
    int            block_id;

    /* Search array for existing entry or first unused slot */
    for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
    {
        PageData   *page = &state->pages[block_id];

        if (BufferIsInvalid(page->buffer))
        {
            /* Empty slot, so use it (there cannot be a match later) */
            page->buffer = buffer;
            page->flags = flags;
            memcpy(page->image, BufferGetPage(buffer), BLCKSZ);
            return (Page) page->image;
        }
        else if (page->buffer == buffer)
        {
            /*
             * Buffer is already registered.  Just return the image, which is
             * already prepared.
             */
            return (Page) page->image;
        }
    }

    elog(ERROR, "maximum number %d of generic xlog buffers is exceeded",
         MAX_GENERIC_XLOG_PAGES);
    /* keep compiler quiet */
    return NULL;
}

/*
 * Apply changes represented by GenericXLogState to the actual buffers,
 * and emit a generic xlog record.
 */
XLogRecPtr
GenericXLogFinish(GenericXLogState *state)
{// #lizard forgives
    XLogRecPtr    lsn;
    int            i;

    if (state->isLogged)
    {
        /* Logged relation: make xlog record in critical section. */
        XLogBeginInsert();

        START_CRIT_SECTION();

        for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
        {
            PageData   *pageData = &state->pages[i];
            Page        page;
            PageHeader    pageHeader;

            if (BufferIsInvalid(pageData->buffer))
                continue;

            page = BufferGetPage(pageData->buffer);
            pageHeader = (PageHeader) pageData->image;

            if (pageData->flags & GENERIC_XLOG_FULL_IMAGE)
            {
                /*
                 * A full-page image does not require us to supply any xlog
                 * data.  Just apply the image, being careful to zero the
                 * "hole" between pd_lower and pd_upper in order to avoid
                 * divergence between actual page state and what replay would
                 * produce.
                 */
                memcpy(page, pageData->image, pageHeader->pd_lower);
                memset(page + pageHeader->pd_lower, 0,
                       pageHeader->pd_upper - pageHeader->pd_lower);
                memcpy(page + pageHeader->pd_upper,
                       pageData->image + pageHeader->pd_upper,
                       BLCKSZ - pageHeader->pd_upper);

                XLogRegisterBuffer(i, pageData->buffer,
                                   REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
            }
            else
            {
                /*
                 * In normal mode, calculate delta and write it as xlog data
                 * associated with this page.
                 */
                computeDelta(pageData, page, (Page) pageData->image);

                /* Apply the image, with zeroed "hole" as above */
                memcpy(page, pageData->image, pageHeader->pd_lower);
                memset(page + pageHeader->pd_lower, 0,
                       pageHeader->pd_upper - pageHeader->pd_lower);
                memcpy(page + pageHeader->pd_upper,
                       pageData->image + pageHeader->pd_upper,
                       BLCKSZ - pageHeader->pd_upper);

                XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);
                XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);
            }
        }

        /* Insert xlog record */
        lsn = XLogInsert(RM_GENERIC_ID, 0);

        /* Set LSN and mark buffers dirty */
        for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
        {
            PageData   *pageData = &state->pages[i];

            if (BufferIsInvalid(pageData->buffer))
                continue;
            PageSetLSN(BufferGetPage(pageData->buffer), lsn);
            MarkBufferDirty(pageData->buffer);
        }
        END_CRIT_SECTION();
    }
    else
    {
        /* Unlogged relation: skip xlog-related stuff */
        START_CRIT_SECTION();
        for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
        {
            PageData   *pageData = &state->pages[i];

            if (BufferIsInvalid(pageData->buffer))
                continue;
            memcpy(BufferGetPage(pageData->buffer),
                   pageData->image,
                   BLCKSZ);
            /* We don't worry about zeroing the "hole" in this case */
            MarkBufferDirty(pageData->buffer);
        }
        END_CRIT_SECTION();
        /* We don't have a LSN to return, in this case */
        lsn = InvalidXLogRecPtr;
    }

    pfree(state);

    return lsn;
}

/*
 * Abort generic xlog record construction.  No changes are applied to buffers.
 *
 * Note: caller is responsible for releasing locks/pins on buffers, if needed.
 */
void
GenericXLogAbort(GenericXLogState *state)
{
    pfree(state);
}

/*
 * Apply delta to given page image.
 */
static void
applyPageRedo(Page page, const char *delta, Size deltaSize)
{
    const char *ptr = delta;
    const char *end = delta + deltaSize;

    while (ptr < end)
    {
        OffsetNumber offset,
                    length;

        memcpy(&offset, ptr, sizeof(offset));
        ptr += sizeof(offset);
        memcpy(&length, ptr, sizeof(length));
        ptr += sizeof(length);

        memcpy(page + offset, ptr, length);

        ptr += length;
    }
}

/*
 * Redo function for generic xlog record.
 */
void
generic_redo(XLogReaderState *record)
{
    XLogRecPtr    lsn = record->EndRecPtr;
    Buffer        buffers[MAX_GENERIC_XLOG_PAGES];
    uint8        block_id;

    /* Protect limited size of buffers[] array */
    Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);

    /* Iterate over blocks */
    for (block_id = 0; block_id <= record->max_block_id; block_id++)
    {
        XLogRedoAction action;

        if (!XLogRecHasBlockRef(record, block_id))
        {
            buffers[block_id] = InvalidBuffer;
            continue;
        }

        action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);

        /* Apply redo to given block if needed */
        if (action == BLK_NEEDS_REDO)
        {
            Page        page;
            PageHeader    pageHeader;
            char       *blockDelta;
            Size        blockDeltaSize;

            page = BufferGetPage(buffers[block_id]);
            blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize);
            applyPageRedo(page, blockDelta, blockDeltaSize);

            /*
             * Since the delta contains no information about what's in the
             * "hole" between pd_lower and pd_upper, set that to zero to
             * ensure we produce the same page state that application of the
             * logged action by GenericXLogFinish did.
             */
            pageHeader = (PageHeader) page;
            memset(page + pageHeader->pd_lower, 0,
                   pageHeader->pd_upper - pageHeader->pd_lower);

            PageSetLSN(page, lsn);
            MarkBufferDirty(buffers[block_id]);
        }
    }

    /* Changes are done: unlock and release all buffers */
    for (block_id = 0; block_id <= record->max_block_id; block_id++)
    {
        if (BufferIsValid(buffers[block_id]))
            UnlockReleaseBuffer(buffers[block_id]);
    }
}

/*
 * Mask a generic page before performing consistency checks on it.
 */
void
generic_mask(char *page, BlockNumber blkno)
{
    mask_page_lsn(page);

    mask_unused_space(page);
}
